{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Concurrent Agents\n",
    "\n",
    "In this section, we explore the use of multiple agents working concurrently. We cover three main patterns:\n",
    "\n",
    "1. **Single Message & Multiple Processors**  \n",
    "   Demonstrates how a single message can be processed by multiple agents subscribed to the same topic simultaneously.\n",
    "\n",
    "2. **Multiple Messages & Multiple Processors**  \n",
    "   Illustrates how specific message types can be routed to dedicated agents based on topics.\n",
    "\n",
    "3. **Direct Messaging**  \n",
    "   Focuses on sending messages between agents and from the runtime to agents."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "from dataclasses import dataclass\n",
    "\n",
    "from autogen_core import (\n",
    "    AgentId,\n",
    "    ClosureAgent,\n",
    "    ClosureContext,\n",
    "    DefaultTopicId,\n",
    "    MessageContext,\n",
    "    RoutedAgent,\n",
    "    SingleThreadedAgentRuntime,\n",
    "    TopicId,\n",
    "    TypeSubscription,\n",
    "    default_subscription,\n",
    "    message_handler,\n",
    "    type_subscription,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "@dataclass\n",
    "class Task:\n",
    "    task_id: str\n",
    "\n",
    "\n",
    "@dataclass\n",
    "class TaskResponse:\n",
    "    task_id: str\n",
    "    result: str"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Single Message & Multiple Processors\n",
    "The first pattern shows how a single message can be processed by multiple agents simultaneously:\n",
    "\n",
    "- Each `Processor` agent subscribes to the default topic using the {py:meth}`~autogen_core.components.default_subscription` decorator.\n",
    "- When publishing a message to the default topic, all registered agents will process the message independently.\n",
    "```{note}\n",
    "Below, we are subscribing `Processor` using the {py:meth}`~autogen_core.components.default_subscription` decorator, there's an alternative way to subscribe an agent without using decorators altogether as shown in [Subscribe and Publish to Topics](../framework/message-and-communication.ipynb#subscribe-and-publish-to-topics), this way the same agent class can be subscribed to different topics.\n",
    "```\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "@default_subscription\n",
    "class Processor(RoutedAgent):\n",
    "    @message_handler\n",
    "    async def on_task(self, message: Task, ctx: MessageContext) -> None:\n",
    "        print(f\"{self._description} starting task {message.task_id}\")\n",
    "        await asyncio.sleep(2)  # Simulate work\n",
    "        print(f\"{self._description} finished task {message.task_id}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Agent 1 starting task task-1\n",
      "Agent 2 starting task task-1\n",
      "Agent 1 finished task task-1\n",
      "Agent 2 finished task task-1\n"
     ]
    }
   ],
   "source": [
    "runtime = SingleThreadedAgentRuntime()\n",
    "\n",
    "await Processor.register(runtime, \"agent_1\", lambda: Processor(\"Agent 1\"))\n",
    "await Processor.register(runtime, \"agent_2\", lambda: Processor(\"Agent 2\"))\n",
    "\n",
    "runtime.start()\n",
    "\n",
    "await runtime.publish_message(Task(task_id=\"task-1\"), topic_id=DefaultTopicId())\n",
    "\n",
    "await runtime.stop_when_idle()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Multiple messages & Multiple Processors\n",
    "Second, this pattern demonstrates routing different types of messages to specific processors:\n",
    "- `UrgentProcessor` subscribes to the \"urgent\" topic\n",
    "- `NormalProcessor` subscribes to the \"normal\" topic\n",
    "\n",
    "We make an agent subscribe to a specific topic type using the {py:meth}`~autogen_core.components.type_subscription` decorator."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {},
   "outputs": [],
   "source": [
    "TASK_RESULTS_TOPIC_TYPE = \"task-results\"\n",
    "task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source=\"default\")\n",
    "\n",
    "\n",
    "@type_subscription(topic_type=\"urgent\")\n",
    "class UrgentProcessor(RoutedAgent):\n",
    "    @message_handler\n",
    "    async def on_task(self, message: Task, ctx: MessageContext) -> None:\n",
    "        print(f\"Urgent processor starting task {message.task_id}\")\n",
    "        await asyncio.sleep(1)  # Simulate work\n",
    "        print(f\"Urgent processor finished task {message.task_id}\")\n",
    "\n",
    "        task_response = TaskResponse(task_id=message.task_id, result=\"Results by Urgent Processor\")\n",
    "        await self.publish_message(task_response, topic_id=task_results_topic_id)\n",
    "\n",
    "\n",
    "@type_subscription(topic_type=\"normal\")\n",
    "class NormalProcessor(RoutedAgent):\n",
    "    @message_handler\n",
    "    async def on_task(self, message: Task, ctx: MessageContext) -> None:\n",
    "        print(f\"Normal processor starting task {message.task_id}\")\n",
    "        await asyncio.sleep(3)  # Simulate work\n",
    "        print(f\"Normal processor finished task {message.task_id}\")\n",
    "\n",
    "        task_response = TaskResponse(task_id=message.task_id, result=\"Results by Normal Processor\")\n",
    "        await self.publish_message(task_response, topic_id=task_results_topic_id)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After registering the agents, we can publish messages to the \"urgent\" and \"normal\" topics:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Normal processor starting task normal-1\n",
      "Urgent processor starting task urgent-1\n",
      "Urgent processor finished task urgent-1\n",
      "Normal processor finished task normal-1\n"
     ]
    }
   ],
   "source": [
    "runtime = SingleThreadedAgentRuntime()\n",
    "\n",
    "await UrgentProcessor.register(runtime, \"urgent_processor\", lambda: UrgentProcessor(\"Urgent Processor\"))\n",
    "await NormalProcessor.register(runtime, \"normal_processor\", lambda: NormalProcessor(\"Normal Processor\"))\n",
    "\n",
    "runtime.start()\n",
    "\n",
    "await runtime.publish_message(Task(task_id=\"normal-1\"), topic_id=TopicId(type=\"normal\", source=\"default\"))\n",
    "await runtime.publish_message(Task(task_id=\"urgent-1\"), topic_id=TopicId(type=\"urgent\", source=\"default\"))\n",
    "\n",
    "await runtime.stop_when_idle()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Collecting Results"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In the previous example, we relied on console printing to verify task completion. However, in real applications, we typically want to collect and process the results programmatically.\n",
    "\n",
    "To collect these messages, we'll use a {py:class}`~autogen_core.components.ClosureAgent`. We've defined a dedicated topic `TASK_RESULTS_TOPIC_TYPE` where both `UrgentProcessor` and `NormalProcessor` publish their results. The ClosureAgent will then process messages from this topic."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Normal processor starting task normal-1\n",
      "Urgent processor starting task urgent-1\n",
      "Urgent processor finished task urgent-1\n",
      "Normal processor finished task normal-1\n"
     ]
    }
   ],
   "source": [
    "queue = asyncio.Queue[TaskResponse]()\n",
    "\n",
    "\n",
    "async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:\n",
    "    await queue.put(message)\n",
    "\n",
    "\n",
    "runtime.start()\n",
    "\n",
    "CLOSURE_AGENT_TYPE = \"collect_result_agent\"\n",
    "await ClosureAgent.register_closure(\n",
    "    runtime,\n",
    "    CLOSURE_AGENT_TYPE,\n",
    "    collect_result,\n",
    "    subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],\n",
    ")\n",
    "\n",
    "await runtime.publish_message(Task(task_id=\"normal-1\"), topic_id=TopicId(type=\"normal\", source=\"default\"))\n",
    "await runtime.publish_message(Task(task_id=\"urgent-1\"), topic_id=TopicId(type=\"urgent\", source=\"default\"))\n",
    "\n",
    "await runtime.stop_when_idle()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')\n",
      "TaskResponse(task_id='normal-1', result='Results by Normal Processor')\n"
     ]
    }
   ],
   "source": [
    "while not queue.empty():\n",
    "    print(await queue.get())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Direct Messages\n",
    "\n",
    "In contrast to the previous patterns, this pattern focuses on direct messages. Here we demonstrate two ways to send them:\n",
    "\n",
    "- Direct messaging between agents  \n",
    "- Sending messages from the runtime to specific agents  \n",
    "\n",
    "Things to consider in the example below:\n",
    "\n",
    "- Messages are addressed using the {py:class}`~autogen_core.components.AgentId`.  \n",
    "- The sender can expect to receive a response from the target agent.  \n",
    "- We register the `WorkerAgent` class only once; however, we send tasks to two different workers.\n",
    "    - How? As stated in [Agent lifecycle](../core-concepts/agent-identity-and-lifecycle.md#agent-lifecycle), when delivering a message using an {py:class}`~autogen_core.components.AgentId`, the runtime will either fetch the instance or create one if it doesn't exist. In this case, the runtime creates two instances of workers when sending those two messages."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [],
   "source": [
    "class WorkerAgent(RoutedAgent):\n",
    "    @message_handler\n",
    "    async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:\n",
    "        print(f\"{self.id} starting task {message.task_id}\")\n",
    "        await asyncio.sleep(2)  # Simulate work\n",
    "        print(f\"{self.id} finished task {message.task_id}\")\n",
    "        return TaskResponse(task_id=message.task_id, result=f\"Results by {self.id}\")\n",
    "\n",
    "\n",
    "class DelegatorAgent(RoutedAgent):\n",
    "    def __init__(self, description: str, worker_type: str):\n",
    "        super().__init__(description)\n",
    "        self.worker_instances = [AgentId(worker_type, f\"{worker_type}-1\"), AgentId(worker_type, f\"{worker_type}-2\")]\n",
    "\n",
    "    @message_handler\n",
    "    async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:\n",
    "        print(f\"Delegator received task {message.task_id}.\")\n",
    "\n",
    "        subtask1 = Task(task_id=\"task-part-1\")\n",
    "        subtask2 = Task(task_id=\"task-part-2\")\n",
    "\n",
    "        worker1_result, worker2_result = await asyncio.gather(\n",
    "            self.send_message(subtask1, self.worker_instances[0]), self.send_message(subtask2, self.worker_instances[1])\n",
    "        )\n",
    "\n",
    "        combined_result = f\"Part 1: {worker1_result.result}, \" f\"Part 2: {worker2_result.result}\"\n",
    "        task_response = TaskResponse(task_id=message.task_id, result=combined_result)\n",
    "        return task_response"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Delegator received task main-task.\n",
      "worker/worker-1 starting task task-part-1\n",
      "worker/worker-2 starting task task-part-2\n",
      "worker/worker-1 finished task task-part-1\n",
      "worker/worker-2 finished task task-part-2\n",
      "Final result: Part 1: Results by worker/worker-1, Part 2: Results by worker/worker-2\n"
     ]
    }
   ],
   "source": [
    "runtime = SingleThreadedAgentRuntime()\n",
    "\n",
    "await WorkerAgent.register(runtime, \"worker\", lambda: WorkerAgent(\"Worker Agent\"))\n",
    "await DelegatorAgent.register(runtime, \"delegator\", lambda: DelegatorAgent(\"Delegator Agent\", \"worker\"))\n",
    "\n",
    "runtime.start()\n",
    "\n",
    "delegator = AgentId(\"delegator\", \"default\")\n",
    "response = await runtime.send_message(Task(task_id=\"main-task\"), recipient=delegator)\n",
    "\n",
    "print(f\"Final result: {response.result}\")\n",
    "await runtime.stop_when_idle()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Additional Resources"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you're interested in more about concurrent processing, check out the [Mixture of Agents](./mixture-of-agents.ipynb) pattern, which relies heavily on concurrent agents."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "autogen",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
